package com.heima.article.listener;

import com.alibaba.fastjson.JSON;
import com.heima.article.service.HotArticleService;
import com.heima.common.constants.article.HotArticleConstants;
import com.heima.model.article.mess.ArticleVisitStreamMess;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 * kafka消费者监听器：调用重新计算文章分值的业务
 */
@Component
public class HotArticleKafkaListener {

    @Autowired
    private HotArticleService hotArticleService;

    @KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC)
    public void hotArticleScoreUpdate(ConsumerRecord<String,String> consumerRecord){
        Optional<ConsumerRecord<String, String>> optional = Optional.ofNullable(consumerRecord);
        optional.ifPresent(x -> {
            ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(x.value(),ArticleVisitStreamMess.class);
            System.out.println("【kafka消费者更新分值开始执行。。。。。。。。】" + x.value());
            //调用业务方法执行更新分值
            hotArticleService.recomputeScore(articleVisitStreamMess);
            System.out.println("【kafka消费者更新分值执行结束。。。。。。。。】");
        });
    }

}
